package com.heima.schedule.service;

import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.heima.common.cache.CacheService;
import com.heima.model.schedule.dto.Task;
import com.heima.model.schedule.pojos.Taskinfo;
import com.heima.model.schedule.pojos.TaskinfoLogs;
import com.heima.schedule.mapper.TaskinfoLogsMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.beanutils.BeanUtilsBean;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;

import java.util.Date;
import java.util.List;
import java.util.Set;

/**
 * @author itheima
 * @since 2022-07-18
 */
@Slf4j
@Service
public class TaskServiceImpl implements TaskService {

    @Autowired
    private TaskinfoService taskinfoService;

    @Autowired
    private TaskinfoLogsService taskinfoLogsService;

    @Autowired
    private CacheService cacheService;

    @Override
    @Transactional(rollbackFor = RuntimeException.class)
    public Long addTask(Task task) {
        // 1.参数校验
        if (task == null) {
            log.warn("入参不能为空");
            return null;
        }

        // 2.插入taskInfo表
        Taskinfo taskInfo = new Taskinfo();
        BeanUtils.copyProperties(task, taskInfo);
        taskInfo.setExecuteTime(new Date(task.getExecuteTime()));
        boolean taskInfoResult = taskinfoService.save(taskInfo);
        if (!taskInfoResult) {
            log.warn("任务插入失败");
            return null;
        }

        // 3.插入taskInfoLog表
        TaskinfoLogs logs = new TaskinfoLogs();
        BeanUtils.copyProperties(task, logs);
        logs.setTaskId(taskInfo.getTaskId());
        logs.setExecuteTime(new Date(task.getExecuteTime()));
        logs.setStatus(0);
        boolean logsResult = taskinfoLogsService.save(logs);
        if (!logsResult) {
            log.warn("日志插入失败");
            throw new RuntimeException("日志插入失败");
        }

        // 4.为Task赋值id
        task.setTaskId(taskInfo.getTaskId());

        // 5.写入缓存
        addTaskToCache(task);

        // 6. 返回任务id
        return taskInfo.getTaskId();
    }

    public void addTaskToCache(Task task) {
        // 任务执行时间
        long executeTime = task.getExecuteTime();
        // 当前时间
        long currentTime = System.currentTimeMillis();
        // 未来五分钟时间
        long futureTime = currentTime + (5 * 60 * 1000);

        // 判断任务执行时间是否小于当前时间
        if (executeTime <= currentTime) {
            // 加入到当前执行队列
            cacheService.lLeftPush("TOPIC", JSON.toJSONString(task));
        }
        // 判断任务执行时间是否小于未来五分钟
        else if (executeTime <= futureTime) {
            cacheService.zAdd("FUTURE", JSON.toJSONString(task), executeTime);
        }
    }

    @Override
    @Transactional(rollbackFor = RuntimeException.class)
    public Boolean cancelTask(Long taskId) {
        if (taskId == null) {
            log.warn("入参不能为空");
            return false;
        }

        // 1. 删除 taskinfo
        boolean taskInfoResult = taskinfoService.removeById(taskId);
        if (!taskInfoResult) {
            log.warn("任务删除失败");
            return false;
        }

        // 2. 更新 taskinfolog
        LambdaQueryWrapper<TaskinfoLogs> wrapper = new LambdaQueryWrapper<>();
        wrapper.eq(TaskinfoLogs::getTaskId, taskId);
        TaskinfoLogs logs = taskinfoLogsService.getOne(wrapper);
        logs.setStatus(2);
        boolean logResult = taskinfoLogsService.updateById(logs);
        if (!logResult) {
            log.warn("日志更新失败");
            throw new RuntimeException("日志更新失败");
        }

        // 3. 组装Task结构
        Task task = new Task();
        BeanUtils.copyProperties(logs, task);
        task.setExecuteTime(logs.getExecuteTime().getTime());

        // 4. 删除redis里面的 list 和 zset结构数据
        cacheService.lRemove("TOPIC", 0, JSON.toJSONString(task));
        cacheService.zRemove("FUTURE", JSON.toJSONString(task));

        return true;
    }

    @Override
    @Transactional(rollbackFor = RuntimeException.class)
    public Task pollTask() {
        // 1.从TOPIC队列弹出数据
        String taskString = cacheService.lRightPop("TOPIC");
        if (StringUtils.isEmpty(taskString)) {
            log.warn("没有可执行的任务");
            return null;
        }

        // 2.转成Task对象
        Task task = JSON.parseObject(taskString, Task.class);
        if (task == null) {
            log.warn("没有可执行的任务");
            return null;
        }

        // 3.删除TaskInfo数据
        boolean taskInfoResult = taskinfoService.removeById(task.getTaskId());
        if (!taskInfoResult) {
            log.warn("删除任务失败");
            return null;
        }

        // 4.更新TaskInfoLogs状态
        TaskinfoLogs logs = new TaskinfoLogs();
        BeanUtils.copyProperties(task, logs);
        logs.setExecuteTime(new Date(task.getExecuteTime()));
        logs.setStatus(1);
        boolean logResult = taskinfoLogsService.updateById(logs);
        if (!logResult) {
            log.warn("日志更新失败");
            throw new RuntimeException("日志更新失败");
        }

        // 5.返回Task数据
        return task;
    }

    // 秒 分 小时 天 月
    // 0/1 从0秒开始，每隔一秒执行一次
    //@Scheduled(fixedRate = 1000 * 60 * 5)
    public void refresh() {
        log.info("我被执行了");
        String lockToken = cacheService.tryLock("refresh_lock", 5000);
        if (StringUtils.isEmpty(lockToken)) {
            return;
        }

        Set<String> future = cacheService.zRangeByScore("FUTURE", 0, System.currentTimeMillis());
        if (CollectionUtils.isEmpty(future)) {
            log.warn("没有可移动的任务");
            return;
        }

        cacheService.refreshWithPipeline("FUTURE", "TOPIC", future);
    }

    //@Scheduled(fixedRate = 5 * 60 * 1000)
    public void initData() {
        // 清除缓存
        clear();

        List<Taskinfo> list = taskinfoService.list();
        if (CollectionUtils.isEmpty(list)) {
            log.warn("任务为空");
            return;
        }

        for (Taskinfo taskinfo : list) {
            if (taskinfo == null) {
                continue;
            }

            long executeTime = taskinfo.getExecuteTime().getTime();
            long currentTime = System.currentTimeMillis();
            long futureTime = currentTime + (5 * 60 * 1000);

            Task task = new Task();
            BeanUtils.copyProperties(taskinfo, task);
            task.setExecuteTime(executeTime);

            if (executeTime <= currentTime) {
                cacheService.lLeftPush("TOPIC", JSON.toJSONString(task));
            } else if (executeTime <= futureTime) {
                cacheService.zAdd("FUTURE", JSON.toJSONString(task), executeTime);
            }
        }
    }

    private void clear() {
        cacheService.delete("TOPIC");
        cacheService.delete("FUTURE");
    }
}
